package com.leon.hom.transform.server;

import com.leon.hom.core.common.Constants;
import com.leon.hom.core.common.MessageType;
import com.leon.hom.core.config.ListenConfig;
import com.leon.hom.core.config.RootConfig;
import com.leon.hom.core.evn.EnvUtil;
import com.leon.hom.core.log.Loggers;
import com.leon.hom.core.utils.ProtostuffUtils;
import com.leon.hom.http.HttpClient;
import com.leon.hom.http.HttpServer;
import com.leon.hom.http.entity.HttpRequestParam;
import com.leon.hom.http.entity.HttpResponseParam;
import com.leon.hom.mqtt.PublishMqttClient;
import com.leon.hom.mqtt.ReceiveMqttClient;
import com.typesafe.config.ConfigBeanFactory;
import com.typesafe.config.ConfigFactory;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.File;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class TransformServer {

    private static final Map<Integer, HttpServer> portServerMap = new ConcurrentHashMap<>();

    private ReceiveMqttClient receiveMqttClient;

    private HttpClient httpClient;

    private PublishMqttClient publishMqttClient;

    @PostConstruct
    public void init() throws Exception {
        String confPath = EnvUtil.getConfFilePath();

        RootConfig rootConfig = null;
        try {
            rootConfig = ConfigBeanFactory.create(ConfigFactory.parseFile(new File(confPath)), RootConfig.class);
            Loggers.TRANSFORM.info("Load config: {}", confPath);
        } catch (Exception e) {
            Loggers.TRANSFORM.error("Load config: {} , error: {}", confPath, e.getMessage());
        }

        if (null == rootConfig) return;

        for (ListenConfig listenConfig : rootConfig.getListen()) {
            portServerMap.put(listenConfig.getPort(), new HttpServer(rootConfig.getTag(), listenConfig, this::handleHttpRequestMessage));
        }

        httpClient = new HttpClient(rootConfig.getRedirect());

        String topic = String.format("/%s/%s/+", Constants.HOM, rootConfig.getTag());

        publishMqttClient = new PublishMqttClient(rootConfig.getBroker());
        receiveMqttClient = new ReceiveMqttClient(rootConfig.getBroker(), topic, (topic1, mqttMessage) -> {
            String[] split = topic1.split("/");
            String type = split[split.length - 1];
            if (MessageType.REQUEST.getType().equals(type)) {
                handleMqttRequestMessage(mqttMessage);
            } else if (MessageType.RESPONSE.getType().equals(type)) {
                handleMqttResponseMessage(mqttMessage);
            }
        });
    }

    @PreDestroy
    public void close() throws Exception {
        portServerMap.forEach((port, server) -> server.close());
        receiveMqttClient.close();
        publishMqttClient.close();
    }

    //http 监听request消息 封装成mqtt publish
    public void handleHttpRequestMessage(HttpRequestParam requestParam) {
        String topic = String.format("/%s/%s/%s", Constants.HOM, requestParam.getTargetTag(), MessageType.REQUEST.getType());
        publishMqttClient.publish(topic, ProtostuffUtils.serialize(requestParam));
    }


    //mqtt 监听request消息 封装成http 发出请求 得到响应后 调用handleHttpResponseMessage
    public void handleMqttRequestMessage(MqttMessage mqttMessage) {
        HttpRequestParam requestParam = ProtostuffUtils.deserialize(mqttMessage.getPayload(), HttpRequestParam.class);
        httpClient.doRequest(requestParam, this::handleHttpResponseMessage);
    }


    //http 处理响应 封装成mqtt publish
    public void handleHttpResponseMessage(HttpResponseParam responseParam) {
        String topic = String.format("/%s/%s/%s", Constants.HOM, responseParam.getTargetTag(), MessageType.RESPONSE.getType());
        publishMqttClient.publish(topic, ProtostuffUtils.serialize(responseParam));
    }

    //mqtt 监听response消息 获得session 后进行响应
    public void handleMqttResponseMessage(MqttMessage mqttMessage) {
        HttpResponseParam responseParam = ProtostuffUtils.deserialize(mqttMessage.getPayload(), HttpResponseParam.class);
        HttpServer httpServer = portServerMap.get(responseParam.getSourcePort());
        if (null != httpServer) {
            httpServer.reply(responseParam);
        }
    }


}
